package com.shujia.airPM25;

import com.shujia.tuijian.TuiJianDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapTask;
import org.apache.hadoop.mapred.ReduceTask;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

//20220527:1001 20
//20220527:1001 30
//20220527:1001 40
//...
//class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
//    @Override
//    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//        //将一行的数据转成String类型
//        String line = value.toString();
//        String[] strings = line.split(",");
//        //过滤出PM2.5对应的数据
//        if (strings.length >= 4 && "PM2.5".equals(strings[2])) {
//            for (int i = 3, j = 1001; i < strings.length; i++, j++) {
//                //对一行数据做简单的清洗，因为有的时间没有监控到PM2.5的值
//                if ("".equals(strings[i]) || strings[i] == null || " ".equals(strings[i])) {
//                    strings[i] = "0";
//                }
//
//                context.write(new Text("date: " + strings[0] + "-城市编号:" + j), new LongWritable(Long.parseLong(strings[i])));
//            }
//        }
//    }
//}

class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        Long sum = 0L;
        for (LongWritable value : values) {
            long l = value.get();
            sum += l;
        }
        //除以24得到该城市当天的PM2.5平均值
        long avg = sum / 24;
        context.write(key, new LongWritable(avg));
    }
}


public class Pm25Avg {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Pm25Avg.class);
        job.setNumReduceTasks(1);
        job.setJobName("计算每个城市每天的pm2.5平均值");


        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //将job任务提交到集群中运行
        job.waitForCompletion(true);

//        MapTask
//        ReduceTask
    }
}
